Deal Ingestion System
Complete multi-source deal ingestion system with AI-powered analysis and metrics generation.
Overview
The Deal Ingestion System allows you to import deals from 7 different sources, automatically analyze them with AI, and generate actionable metrics.
Supported Sources
1. Manual Entry
- **Use Case:** Sales team manually entering deals
- **Features:**
- Full deal details (name, company, value, stage, probability)
- Expected close date
- Contact information
- Description/notes
2. Web Forms
- **Use Case:** Website contact forms, demo requests
- **Features:**
- Automatic value estimation based on company size
- Lead qualification
- UTM tracking
- Product interest capture
3. CSV Upload
- **Use Case:** Bulk imports from other systems
- **Supported Columns:**
- Deal Name / name (required)
- Company / company / Account
- Value / value / Amount
- Stage / stage / Stage Name
- Probability / probability
- Expected Close Date / close_date
- Contact Email / email
- Contact Name / contact
- Description / description
4. Email Leads
- **Use Case:** Extracting deals from incoming emails
- **Features:**
- Automatic company detection
- Subject/content analysis
- Sender information capture
- Default value estimation ($15,000)
5. API Integration
- **Use Case:** External systems, integrations
- **Features:**
- RESTful API
- Flexible data format
- Metadata support
- Bulk operations
6. HubSpot Sync
- **Use Case:** HubSpot CRM integration
- **Features:**
- OAuth authentication
- Real-time webhook sync
- Batch import
- Field mapping
7. Salesforce Sync
- **Use Case:** Salesforce CRM integration
- **Features:**
- OAuth authentication
- Opportunity sync
- Real-time updates
- Field mapping
Backend Architecture
Core Service: `DealIngestionService`
Location: backend-saas/sales/deal_ingestion_service.py
**Key Methods:**
async def ingest_deal(
tenant_id: str,
deal_data: Dict[str, Any],
source: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]**Pipeline:**
- **Normalization** - Convert source-specific format to standard
- **Deduplication** - Check for existing deals by name/company/external ID
- **Create/Update** - Create new or update existing deal
- **Lead Association** - Create/update associated lead
- **AI Analysis** - Run health scoring and generate insights
- **Metrics Generation** - Calculate deal percentiles and pipeline metrics
- **Activity Logging** - Log ingestion in activity feed
- **WebSocket Broadcast** - Real-time update to dashboard
- **Automation Trigger** - Trigger workflows for high-risk deals
API Endpoints
Base path: /api/sales/deals
| Endpoint | Method | Description |
|---|---|---|
/manual | POST | Create manual deal |
/web-form | POST | Submit web form |
/csv/upload | POST | Upload CSV file |
/email | POST | Extract from email |
/api | POST | API submission |
/hubspot/sync | POST | Sync HubSpot deal |
/hubspot/batch-sync | POST | Batch sync HubSpot |
/salesforce/sync | POST | Sync Salesforce opportunity |
/salesforce/batch-sync | POST | Batch sync Salesforce |
/bulk | POST | Bulk import |
/metrics | GET | Get deal metrics |
Frontend Components
DealIngestionPanel
Location: src/components/dashboards/DealIngestionPanel.tsx
**Features:**
- Tabbed interface for all sources
- Form validation
- Real-time feedback
- Success/error messages
- CSV upload with progress
- Integration documentation
Integration with SalesCommandCenter
The SalesCommandCenter now includes an "Add Deal" button that opens the DealIngestionPanel.
AI-Powered Analysis
Health Score Calculation
Based on multiple factors:
- **Velocity**: Days in current stage
- **Engagement**: Recent activity
- **Value**: Deal size vs probability
- **Risk Factors**: Stalled deals, low engagement
**Score Range:** 0-100
- **High Risk**: < 40
- **Medium Risk**: 40-70
- **Healthy**: 70+
AI Insights
Automatically generated using GPT-4:
- Risk factor or opportunity
- Recommended next action
- Competitive/market insight
Metrics Generated
For each deal:
- **Percentile**: Value ranking in pipeline
- **Value Rank**: Position by value
- **Total Active Deals**: Pipeline count
- **Stage Distribution**: Deals per stage
- **Pipeline Value**: Total and weighted
Data Models
Deal Model
class Deal(Base):
id = Column(String, primary_key=True)
tenant_id = Column(String, ForeignKey("tenants.id"))
name = Column(String, nullable=False)
company = Column(String)
value = Column(Float, default=0.0)
stage = Column(SQLEnum(DealStage), default="discovery")
probability = Column(Float, default=50.0)
source = Column(String, default="manual")
expected_close_date = Column(DateTime(timezone=True))
health_score = Column(Float, default=70.0)
risk_level = Column(String, default="low")
source_metadata = Column(JSON)
created_at = Column(DateTime(timezone=True))
updated_at = Column(DateTime(timezone=True))Lead Model
class Lead(Base):
id = Column(String, primary_key=True)
tenant_id = Column(String, ForeignKey("tenants.id"))
email = Column(String, nullable=False)
first_name = Column(String)
last_name = Column(String)
company = Column(String)
source = Column(String)
status = Column(SQLEnum(LeadStatus), default="new")
ai_score = Column(Float, default=0.0)
ai_qualification_summary = Column(Text)
is_spam = Column(Boolean, default=False)
metadata_json = Column(JSON)Usage Examples
Manual Deal Entry
const response = await axios.post('/api/sales/deals/manual', {
name: 'Enterprise License - Acme Corp',
company: 'Acme Corporation',
value: 120000,
stage: 'proposal',
probability: 60,
expected_close_date: '2026-06-01',
contact_email: 'ceo@acme.com',
contact_name: 'John Smith',
description: 'Enterprise license for 500 users'
});Web Form Submission
const response = await axios.post('/api/sales/deals/web-form', {
first_name: 'Jane',
last_name: 'Doe',
email: 'jane@company.com',
company: 'Company Inc',
message: 'Interested in enterprise plan',
product_interest: 'Enterprise License',
company_size: '51-200'
});CSV Upload
const formData = new FormData();
formData.append('file', csvFile);
const response = await axios.post('/api/sales/deals/csv/upload', formData, {
headers: {
'Content-Type': 'multipart/form-data'
}
});Email Extraction
const response = await axios.post('/api/sales/deals/email', {
sender: 'prospect@company.com',
sender_name: 'John Smith',
subject: 'Enterprise solution inquiry',
content: 'We are looking for...',
company_name: 'Prospect Corp'
});API Submission
const response = await axios.post('/api/sales/deals/api', {
name: 'Custom Deal Name',
company: 'Company Name',
value: 75000,
stage: 'discovery',
probability: 40,
contact_email: 'contact@company.com',
external_id: 'ext-12345',
metadata: {
integration: 'custom_system',
source_campaign: 'spring-sale'
}
});HubSpot Integration
Webhook Setup
- Configure HubSpot webhook to call:
POST /api/sales/deals/hubspot/sync - Add query parameters:
workspace_idandtenant_id - Send deal object in request body
Batch Sync
# Python example for batch sync
import requests
deals = hubspot_client.crm.deals.get_all()
response = requests.post(
f"{API_URL}/api/sales/deals/hubspot/batch-sync",
params={
"workspace_id": workspace_id,
"tenant_id": tenant_id
},
json=[deal.to_dict() for deal in deals]
)Salesforce Integration
Webhook Setup
- Configure Salesforce outbound message
- Endpoint:
POST /api/sales/deals/salesforce/sync - Send opportunity data
Batch Sync
# Python example for batch sync
import requests
opps = sf.query_all("SELECT Id, Name, Amount, StageName, Probability, CloseDate FROM Opportunity")
response = requests.post(
f"{API_URL}/api/sales/deals/salesforce/batch-sync",
params={
"workspace_id": workspace_id,
"tenant_id": tenant_id
},
json=opps['records']
)Dashboard Integration
The SalesCommandCenter automatically:
- Displays ingested deals in real-time
- Shows AI insights for each deal
- Updates metrics on new deal creation
- Broadcasts updates via WebSocket
Error Handling
All endpoints return consistent error format:
{
"status": "error",
"error": "Error message",
"source": "source_name"
}Security & Multi-Tenancy
- All requests are scoped to
tenant_id - Workspace verification required
- No cross-tenant data access
- Audit logging for all ingestions
Performance
- **Deduplication**: O(1) external ID lookup, O(n) name/company search
- **Batch CSV**: Processes row-by-row with error isolation
- **AI Analysis**: Async, non-blocking
- **WebSocket**: Real-time updates
Extensibility
To add a new source:
- Add source to
DealIngestionSourceenum - Create normalization method:
_normalize_[source]_deal() - Add API endpoint in
deal_ingestion_routes.py - Add frontend tab in
DealIngestionPanel.tsx - Update documentation
Monitoring & Logging
- All ingestions logged to database
- Error tracking with source context
- Success/failure metrics per source
- WebSocket broadcasts for monitoring
Future Enhancements
- [ ] Advanced email parsing (Gmail/Outlook API)
- [ ] Lead enrichment (Clearbit, etc.)
- [ ] Automated drip campaigns
- [ ] Predictive lead scoring
- [ ] Advanced field mapping for CRM sync
- [ ] Deal collaboration features
- [ ] Pipeline velocity tracking
- [ ] Win/loss analysis